1pub mod acp;
15pub mod health;
16pub mod persistent;
17pub mod pidregistry;
18pub mod process_tree;
19pub mod session_recovery;
20pub mod session_stats;
21pub mod shell;
22pub mod subprocess;
23pub mod watchdog;
24
25use std::any::Any;
26use std::collections::HashMap;
27use std::sync::{Arc, RwLock};
28
29use serde::{Deserialize, Serialize};
30
31use super::eventbus::EventBus;
32use super::storage::filestore::FileStore;
33use super::storage::wstore::WaveStore;
34use super::obj::{Block, MetaMapType, TermSize};
35use super::wps::Broker;
36
37pub const STATUS_INIT: &str = "init";
40pub const STATUS_RUNNING: &str = "running";
41pub const STATUS_DONE: &str = "done";
42
43pub const BLOCK_CONTROLLER_SHELL: &str = "shell";
46pub const BLOCK_CONTROLLER_CMD: &str = "cmd";
47pub const BLOCK_CONTROLLER_TSUNAMI: &str = "tsunami";
48pub const BLOCK_CONTROLLER_SUBPROCESS: &str = "subprocess";
49pub const BLOCK_CONTROLLER_PERSISTENT: &str = "persistent";
50pub const BLOCK_CONTROLLER_ACP: &str = "acp";
51
52pub const META_KEY_CONTROLLER: &str = "controller";
55pub const META_KEY_CONNECTION: &str = "connection";
56pub const META_KEY_CMD: &str = "cmd";
57pub const META_KEY_CMD_CWD: &str = "cmd:cwd";
58#[allow(dead_code)]
59pub const META_KEY_CMD_SHELL: &str = "cmd:shell";
60pub const META_KEY_CMD_ARGS: &str = "cmd:args";
61pub const META_KEY_CMD_ENV: &str = "cmd:env";
62#[allow(dead_code)]
63pub const META_KEY_CMD_JWT: &str = "cmd:jwt";
64pub const META_KEY_CMD_RUN_ON_START: &str = "cmd:runonstart";
65pub const META_KEY_CMD_RUN_ONCE: &str = "cmd:runonce";
66pub const META_KEY_CMD_CLEAR_ON_START: &str = "cmd:clearonstart";
67pub const META_KEY_CMD_CLOSE_ON_EXIT: &str = "cmd:closeonexit";
68pub const META_KEY_CMD_CLOSE_ON_EXIT_FORCE: &str = "cmd:closeonexitforce";
69pub const META_KEY_CMD_CLOSE_ON_EXIT_DELAY: &str = "cmd:closeonexitdelay";
70#[allow(dead_code)]
71pub const META_KEY_CMD_INIT_SCRIPT: &str = "cmd:initscript";
72#[allow(dead_code)]
73pub const META_KEY_CMD_INIT_SCRIPT_BASH: &str = "cmd:initscript.bash";
74#[allow(dead_code)]
75pub const META_KEY_CMD_INIT_SCRIPT_ZSH: &str = "cmd:initscript.zsh";
76#[allow(dead_code)]
77pub const META_KEY_CMD_INIT_SCRIPT_FISH: &str = "cmd:initscript.fish";
78#[allow(dead_code)]
79pub const META_KEY_CMD_INIT_SCRIPT_PWSH: &str = "cmd:initscript.pwsh";
80#[allow(dead_code)]
81pub const META_KEY_TERM_LOCAL_SHELL_PATH: &str = "term:localshellpath";
82#[allow(dead_code)]
83pub const META_KEY_TERM_LOCAL_SHELL_OPTS: &str = "term:localshellopts";
84
85#[allow(dead_code)]
89pub const DEFAULT_TIMEOUT_MS: u64 = 2000;
90
91#[allow(dead_code)]
93pub const DEFAULT_GRACEFUL_KILL_WAIT_MS: u64 = 400;
94
95#[derive(Debug, Clone)]
100pub struct BlockInputUnion {
101 pub input_data: Option<Vec<u8>>,
103 pub sig_name: Option<String>,
105 pub term_size: Option<TermSize>,
107}
108
109impl BlockInputUnion {
110 pub fn data(data: Vec<u8>) -> Self {
111 Self {
112 input_data: Some(data),
113 sig_name: None,
114 term_size: None,
115 }
116 }
117
118 pub fn signal(name: &str) -> Self {
119 Self {
120 input_data: None,
121 sig_name: Some(name.to_string()),
122 term_size: None,
123 }
124 }
125
126 pub fn resize(size: TermSize) -> Self {
127 Self {
128 input_data: None,
129 sig_name: None,
130 term_size: Some(size),
131 }
132 }
133}
134
135fn is_false(v: &bool) -> bool {
136 !v
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize, Default)]
143pub struct BlockControllerRuntimeStatus {
144 pub blockid: String,
145 #[serde(default)]
146 pub version: i32,
147 #[serde(default, skip_serializing_if = "String::is_empty")]
148 pub shellprocstatus: String,
149 #[serde(default, skip_serializing_if = "String::is_empty")]
150 pub shellprocconnname: String,
151 #[serde(default)]
152 pub shellprocexitcode: i32,
153 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub spawn_ts_ms: Option<i64>,
156 #[serde(default, skip_serializing_if = "is_false")]
158 pub is_agent_pane: bool,
159}
160
161pub trait Controller: Send + Sync {
166 fn start(
169 &self,
170 block_meta: MetaMapType,
171 rt_opts: Option<serde_json::Value>,
172 force: bool,
173 ) -> Result<(), String>;
174
175 fn stop(&self, graceful: bool, new_status: &str) -> Result<(), String>;
178
179 fn get_runtime_status(&self) -> BlockControllerRuntimeStatus;
181
182 fn send_input(&self, input: BlockInputUnion, seq: Option<u64>) -> Result<(), String>;
185
186 fn controller_type(&self) -> &str;
188
189 #[allow(dead_code)]
191 fn block_id(&self) -> &str;
192
193 fn as_any(&self) -> &dyn Any;
195}
196
197static CONTROLLER_REGISTRY: std::sync::LazyLock<RwLock<HashMap<String, Arc<dyn Controller>>>> =
202 std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
203
204pub fn get_controller(block_id: &str) -> Option<Arc<dyn Controller>> {
206 CONTROLLER_REGISTRY
207 .read()
208 .unwrap()
209 .get(block_id)
210 .cloned()
211}
212
213pub fn register_controller(block_id: &str, controller: Arc<dyn Controller>) {
215 let mut registry = CONTROLLER_REGISTRY.write().unwrap();
216 if let Some(old) = registry.remove(block_id) {
217 let _ = old.stop(true, STATUS_DONE);
219 }
220 registry.insert(block_id.to_string(), controller);
221}
222
223pub fn delete_controller(block_id: &str) {
226 let ctrl = CONTROLLER_REGISTRY.write().unwrap().remove(block_id);
227 if let Some(ctrl) = ctrl {
228 let _ = ctrl.stop(true, STATUS_DONE);
229 }
230 if let Some(registry) = crate::backend::process_tracker::registry::global() {
235 registry.remove(block_id);
236 }
237}
238
239pub fn get_all_controllers() -> HashMap<String, Arc<dyn Controller>> {
241 CONTROLLER_REGISTRY.read().unwrap().clone()
242}
243
244#[allow(dead_code)]
246pub fn stop_all_controllers() {
247 let controllers = get_all_controllers();
248 for (_, ctrl) in controllers {
249 let _ = ctrl.stop(true, STATUS_DONE);
250 }
251}
252
253pub fn get_block_controller_status(block_id: &str) -> Option<BlockControllerRuntimeStatus> {
258 get_controller(block_id).map(|c| c.get_runtime_status())
259}
260
261#[allow(dead_code)]
263pub fn stop_block_controller(block_id: &str) -> Result<(), String> {
264 match get_controller(block_id) {
265 Some(ctrl) => ctrl.stop(true, STATUS_DONE),
266 None => Ok(()), }
268}
269
270pub fn send_input(block_id: &str, input: BlockInputUnion, seq: Option<u64>) -> Result<(), String> {
272 match get_controller(block_id) {
273 Some(ctrl) => ctrl.send_input(input, seq),
274 None => Err(format!("no controller for block {block_id}")),
275 }
276}
277
278pub fn resync_controller(
288 block: &Block,
289 tab_id: &str,
290 rt_opts: Option<serde_json::Value>,
291 force: bool,
292 broker: Option<Arc<Broker>>,
293 event_bus: Option<Arc<EventBus>>,
294 wstore: Option<Arc<WaveStore>>,
295 filestore: Option<Arc<FileStore>>,
296) -> Result<(), String> {
297 let block_id = &block.oid;
298 let block_meta = &block.meta;
299
300 let controller_type = super::obj::meta_get_string(block_meta, META_KEY_CONTROLLER, "");
302
303 if controller_type.is_empty() {
304 return Ok(());
306 }
307
308 tracing::info!(
309 block_id = %block_id,
310 controller_type = %controller_type,
311 wstore_present = wstore.is_some(),
312 event_bus_present = event_bus.is_some(),
313 force,
314 "[dnd-debug] resync_controller entry"
315 );
316
317 let existing = get_controller(block_id);
319 if let Some(ref ctrl) = existing {
320 let needs_replace = if ctrl.controller_type() != controller_type || force {
321 true } else {
323 let status = ctrl.get_runtime_status();
324 let new_conn =
326 super::obj::meta_get_string(block_meta, META_KEY_CONNECTION, "local");
327 status.shellprocconnname != new_conn
328 };
329
330 if needs_replace {
331 let _ = ctrl.stop(true, STATUS_DONE);
332 delete_controller(block_id);
333 } else {
334 let status = ctrl.get_runtime_status();
336 tracing::info!(
337 block_id = %block_id,
338 status = %status.shellprocstatus,
339 "[dnd-debug] existing controller — skipping spawn (no cmd:cwd seed)"
340 );
341 if status.shellprocstatus == STATUS_INIT || status.shellprocstatus == STATUS_DONE {
342 return ctrl.start(block_meta.clone(), rt_opts, force);
343 }
344 return Ok(());
345 }
346 }
347
348 match controller_type.as_str() {
350 BLOCK_CONTROLLER_SHELL | BLOCK_CONTROLLER_CMD => {
351 let ctrl = shell::ShellController::new(
352 controller_type.clone(),
353 tab_id.to_string(),
354 block_id.to_string(),
355 broker,
356 event_bus,
357 wstore,
358 );
359 let ctrl = Arc::new(ctrl);
360 register_controller(block_id, ctrl.clone());
361 ctrl.start(block_meta.clone(), rt_opts, force)
362 }
363 BLOCK_CONTROLLER_SUBPROCESS => {
364 let ctrl = subprocess::SubprocessController::new(
365 tab_id.to_string(),
366 block_id.to_string(),
367 broker,
368 event_bus,
369 wstore,
370 filestore,
371 );
372 let ctrl = Arc::new(ctrl);
373 ctrl.set_self_ref();
374 register_controller(block_id, ctrl.clone());
375 ctrl.start(block_meta.clone(), rt_opts, force)
376 }
377 BLOCK_CONTROLLER_PERSISTENT => {
378 let ctrl = persistent::PersistentSubprocessController::new(
379 tab_id.to_string(),
380 block_id.to_string(),
381 broker,
382 event_bus,
383 wstore,
384 filestore,
385 );
386 let ctrl = Arc::new(ctrl);
387 register_controller(block_id, ctrl.clone());
388 ctrl.start(block_meta.clone(), rt_opts, force)
389 }
390 BLOCK_CONTROLLER_ACP => {
391 let ctrl = acp::AcpController::new(
392 tab_id.to_string(),
393 block_id.to_string(),
394 broker,
395 event_bus,
396 wstore,
397 filestore,
398 );
399 let ctrl = Arc::new(ctrl);
400 register_controller(block_id, ctrl.clone());
401 ctrl.start(block_meta.clone(), rt_opts, force)
402 }
403 BLOCK_CONTROLLER_TSUNAMI => {
404 Err("tsunami controller not yet implemented".to_string())
406 }
407 _ => Err(format!("unknown controller type: {controller_type}")),
408 }
409}
410
411pub fn publish_controller_status(
413 broker: &super::wps::Broker,
414 status: &BlockControllerRuntimeStatus,
415) {
416 use super::wps::{WaveEvent, EVENT_CONTROLLER_STATUS};
417
418 let event = WaveEvent {
419 event: EVENT_CONTROLLER_STATUS.to_string(),
420 scopes: vec![format!("block:{}", status.blockid)],
421 sender: String::new(),
422 persist: 0,
423 data: serde_json::to_value(status).ok(),
424 };
425 broker.publish(event);
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[test]
433 fn test_status_constants() {
434 assert_eq!(STATUS_INIT, "init");
435 assert_eq!(STATUS_RUNNING, "running");
436 assert_eq!(STATUS_DONE, "done");
437 }
438
439 #[test]
440 fn test_controller_type_constants() {
441 assert_eq!(BLOCK_CONTROLLER_SHELL, "shell");
442 assert_eq!(BLOCK_CONTROLLER_CMD, "cmd");
443 assert_eq!(BLOCK_CONTROLLER_TSUNAMI, "tsunami");
444 }
445
446 #[test]
447 fn test_meta_key_constants() {
448 assert_eq!(META_KEY_CONTROLLER, "controller");
449 assert_eq!(META_KEY_CONNECTION, "connection");
450 assert_eq!(META_KEY_CMD, "cmd");
451 assert_eq!(META_KEY_CMD_RUN_ON_START, "cmd:runonstart");
452 }
453
454 #[test]
455 fn test_block_input_union_data() {
456 let input = BlockInputUnion::data(b"hello".to_vec());
457 assert_eq!(input.input_data.as_ref().unwrap(), b"hello");
458 assert!(input.sig_name.is_none());
459 assert!(input.term_size.is_none());
460 }
461
462 #[test]
463 fn test_block_input_union_signal() {
464 let input = BlockInputUnion::signal("SIGTERM");
465 assert!(input.input_data.is_none());
466 assert_eq!(input.sig_name.as_ref().unwrap(), "SIGTERM");
467 assert!(input.term_size.is_none());
468 }
469
470 #[test]
471 fn test_block_input_union_resize() {
472 let size = TermSize { rows: 40, cols: 120 };
473 let input = BlockInputUnion::resize(size.clone());
474 assert!(input.input_data.is_none());
475 assert!(input.sig_name.is_none());
476 let ts = input.term_size.unwrap();
477 assert_eq!(ts.rows, 40);
478 assert_eq!(ts.cols, 120);
479 }
480
481 #[test]
482 fn test_runtime_status_default() {
483 let status = BlockControllerRuntimeStatus::default();
484 assert!(status.blockid.is_empty());
485 assert_eq!(status.version, 0);
486 assert!(status.shellprocstatus.is_empty());
487 assert_eq!(status.shellprocexitcode, 0);
488 }
489
490 #[test]
491 fn test_runtime_status_serde() {
492 let status = BlockControllerRuntimeStatus {
493 blockid: "block-123".to_string(),
494 version: 3,
495 shellprocstatus: STATUS_RUNNING.to_string(),
496 shellprocconnname: "local".to_string(),
497 shellprocexitcode: 0,
498 ..Default::default()
499 };
500 let json = serde_json::to_string(&status).unwrap();
501 assert!(json.contains("\"blockid\":\"block-123\""));
502 assert!(json.contains("\"shellprocstatus\":\"running\""));
503
504 let parsed: BlockControllerRuntimeStatus = serde_json::from_str(&json).unwrap();
505 assert_eq!(parsed.blockid, "block-123");
506 assert_eq!(parsed.version, 3);
507 }
508
509 #[test]
510 fn test_get_nonexistent_controller() {
511 assert!(get_controller("nonexistent-block").is_none());
512 }
513
514 #[test]
515 fn test_get_block_controller_status_none() {
516 assert!(get_block_controller_status("nonexistent").is_none());
517 }
518
519 #[test]
520 fn test_stop_nonexistent_controller() {
521 assert!(stop_block_controller("nonexistent").is_ok());
523 }
524
525 #[test]
526 fn test_send_input_no_controller() {
527 let result = send_input("nonexistent", BlockInputUnion::data(b"test".to_vec()), None);
528 assert!(result.is_err());
529 assert!(result.unwrap_err().contains("no controller"));
530 }
531
532 #[test]
533 fn test_resync_no_controller_type() {
534 let block = Block {
535 oid: "test-block".to_string(),
536 version: 1,
537 meta: HashMap::new(),
538 ..Default::default()
539 };
540 let result = resync_controller(&block, "tab-1", None, false, None, None, None, None);
542 assert!(result.is_ok());
543 }
544
545 #[test]
546 fn test_resync_unknown_controller_type() {
547 let mut meta = MetaMapType::new();
548 meta.insert(
549 "controller".to_string(),
550 serde_json::Value::String("unknown_type".to_string()),
551 );
552 let block = Block {
553 oid: "test-block".to_string(),
554 version: 1,
555 meta,
556 ..Default::default()
557 };
558 let result = resync_controller(&block, "tab-1", None, false, None, None, None, None);
559 assert!(result.is_err());
560 assert!(result.unwrap_err().contains("unknown controller type"));
561 }
562}